40c9c468IienauFHQ_xJIcqnPJ8giQ tools/python/xen/util/ip.py
41dde8b0yuJX-S79w4xJKxBQ-Mhp1A tools/python/xen/util/memmap.py
4059c6a0pnxhG8hwSOivXybbGOwuXw tools/python/xen/util/tempfile.py
+4267a9b16u4IEPhjRryesk6A17sobA tools/python/xen/web/SrvBase.py
+4267a9b1FfCUjW7m9anLERcx9lwhJg tools/python/xen/web/SrvDir.py
+4267a9b1uMXIfzB6-81ZLqMCyTgJmw tools/python/xen/web/__init__.py
+4267a9b1i_zVq36tt2iQejVuR6DGFw tools/python/xen/web/connection.py
+4267a9b1Z2SpO9v-zEDApywETZPDwA tools/python/xen/web/defer.py
+4267a9b1KzSWZwWKYrGRc9bUhow_7Q tools/python/xen/web/http.py
+4267a9b1KWNZhhmZnySe_nLASwO47g tools/python/xen/web/httpserver.py
+4267a9b21miObgEJLAgtLTAKRBK8uQ tools/python/xen/web/protocol.py
+4267a9b2pA22-lF37dB7XfapMNroGw tools/python/xen/web/reactor.py
+4267a9b2AbH-azu7SXIUETXC39tu-A tools/python/xen/web/resource.py
+4267a9b21XhDCpkVXtgea3ko8uS16g tools/python/xen/web/static.py
+4267a9b2q7UA0cU5-KATCWX6O-TKsA tools/python/xen/web/tcp.py
+4267a9b2XqvzKDWxfAdV22c3mO6NHA tools/python/xen/web/unix.py
40c9c468SNuObE_YWARyS0hzTPSzKg tools/python/xen/xend/Args.py
41597996WNvJA-DVCBmc0xU9w_XmoA tools/python/xen/xend/Blkctl.py
40c9c468Um_qc66OQeLEceIz1pgD5g tools/python/xen/xend/EventServer.py
40c9c468IxQabrKJSWs0aEjl-27mRQ tools/python/xen/xend/server/SrvConsole.py
40c9c4689Io5bxfbYIfRiUvsiLX0EQ tools/python/xen/xend/server/SrvConsoleDir.py
40c9c468woSmBByfeXA4o_jGf2gCgA tools/python/xen/xend/server/SrvDaemon.py
-40c9c468kACsmkqjxBWKHRo071L26w tools/python/xen/xend/server/SrvDeviceDir.py
40c9c468EQZJVkCLds-OhesJVVyZbQ tools/python/xen/xend/server/SrvDir.py
40eee3a0m38EwYXfCSFIjWNwG6jx_A tools/python/xen/xend/server/SrvDmesg.py
40c9c468TyHZUq8sk0FF_vxM6Sozrg tools/python/xen/xend/server/SrvDomain.py
40c9c469N2-b3GqpLHHHPZykJPLVvA tools/python/xen/xend/server/channel.py
40c9c469hJ_IlatRne-9QEa0-wlquw tools/python/xen/xend/server/console.py
40c9c469UcNJh_NuLU0ytorM0Lk5Ow tools/python/xen/xend/server/controller.py
-40d83983OXjt-y3HjSCcuoPp9rzvmw tools/python/xen/xend/server/domain.py
4266169exkN9o3hA8vxe8Er0BZv1Xw tools/python/xen/xend/server/event.py
40c9c469yrm31i60pGKslTi2Zgpotg tools/python/xen/xend/server/messages.py
40c9c46925x-Rjb0Cv2f1-l2jZrPYg tools/python/xen/xend/server/netif.py
'xen.xend.server',
'xen.sv',
'xen.xm',
+ 'xen.web',
],
ext_package = "xen.lowlevel",
ext_modules = [ xc, xu ]
--- /dev/null
+# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
+
+import types
+
+from xen.xend import sxp
+from xen.xend import PrettyPrint
+from xen.xend.Args import ArgError
+from xen.xend.XendError import XendError
+from xen.xend.XendLogging import log
+
+import resource
+import http
+import defer
+
+def uri_pathlist(p):
+ """Split a path into a list.
+ p path
+ return list of path elements
+ """
+ l = []
+ for x in p.split('/'):
+ if x == '': continue
+ l.append(x)
+ return l
+
+class SrvBase(resource.Resource):
+ """Base class for services.
+ """
+
+
+ def use_sxp(self, req):
+ """Determine whether to send an SXP response to a request.
+ Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept.
+
+ req request
+ returns 1 for SXP, 0 otherwise
+ """
+ ok = 0
+ user_agent = req.getHeader('User-Agent')
+ accept = req.getHeader('Accept')
+ if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0):
+ ok = 1
+ return ok
+
+ def get_op_method(self, op):
+ """Get the method for an operation.
+ For operation 'foo' looks for 'op_foo'.
+
+ op operation name
+ returns method or None
+ """
+ op_method_name = 'op_' + op
+ return getattr(self, op_method_name, None)
+
+ def perform(self, req):
+ """General operation handler for posted operations.
+ For operation 'foo' looks for a method op_foo and calls
+ it with op_foo(op, req). Replies with code 500 if op_foo
+ is not found.
+
+ The method must return a list when req.use_sxp is true
+ and an HTML string otherwise (or list).
+ Methods may also return a Deferred (for incomplete processing).
+
+ req request
+ """
+ op = req.args.get('op')
+ if op is None or len(op) != 1:
+ req.setResponseCode(http.NOT_ACCEPTABLE, "Invalid request")
+ return ''
+ op = op[0]
+ op_method = self.get_op_method(op)
+ if op_method is None:
+ req.setResponseCode(http.NOT_IMPLEMENTED, "Operation not implemented: " + op)
+ req.setHeader("Content-Type", "text/plain")
+ req.write("Operation not implemented: " + op)
+ return ''
+ else:
+ return self._perform(op, op_method, req)
+
+ def _perform(self, op, op_method, req):
+ try:
+ val = op_method(op, req)
+ except Exception, err:
+ self._perform_err(err, op, req)
+ return ''
+
+ if isinstance(val, defer.Deferred):
+ val.addCallback(self._perform_cb, op, req, dfr=1)
+ val.addErrback(self._perform_err, op, req, dfr=1)
+ return server.NOT_DONE_YET
+ else:
+ self._perform_cb(val, op, req, dfr=0)
+ return ''
+
+ def _perform_cb(self, val, op, req, dfr=0):
+ """Callback to complete the request.
+ May be called from a Deferred.
+
+ @param err: the error
+ @param req: request causing the error
+ @param dfr: deferred flag
+ """
+ if isinstance(val, resource.ErrorPage):
+ req.write(val.render(req))
+ elif self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ sxp.show(val, out=req)
+ else:
+ req.write('<html><head></head><body>')
+ self.print_path(req)
+ if isinstance(val, types.ListType):
+ req.write('<code><pre>')
+ PrettyPrint.prettyprint(val, out=req)
+ req.write('</pre></code>')
+ else:
+ req.write(str(val))
+ req.write('</body></html>')
+ if dfr:
+ req.finish()
+
+ def _perform_err(self, err, op, req, dfr=0):
+ """Error callback to complete a request.
+ May be called from a Deferred.
+
+ @param err: the error
+ @param req: request causing the error
+ @param dfr: deferred flag
+ """
+ if not (isinstance(err, ArgError) or
+ isinstance(err, sxp.ParseError) or
+ isinstance(err, XendError)):
+ if dfr:
+ return err
+ else:
+ raise
+ #log.exception("op=%s: %s", op, str(err))
+ if self.use_sxp(req):
+ req.setHeader("Content-Type", sxp.mime_type)
+ sxp.show(['xend.err', str(err)], out=req)
+ else:
+ req.setHeader("Content-Type", "text/plain")
+ req.write('Error ')
+ req.write(': ')
+ req.write(str(err))
+ if dfr:
+ req.finish()
+
+
+ def print_path(self, req):
+ """Print the path with hyperlinks.
+ """
+ pathlist = [x for x in req.prepath if x != '' ]
+ s = "/"
+ req.write('<h1><a href="/">/</a>')
+ for x in pathlist:
+ s += x + "/"
+ req.write(' <a href="%s">%s</a>/' % (s, x))
+ req.write("</h1>")
+
--- /dev/null
+# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
+
+import types
+
+from xen.xend import sxp
+from xen.xend import PrettyPrint
+from xen.xend.Args import ArgError
+from xen.xend.XendError import XendError
+#from xen.xend.XendLogging import log
+
+import resource
+import http
+
+from xen.web.SrvBase import SrvBase
+
+class SrvConstructor:
+ """Delayed constructor for sub-servers.
+ Does not import the sub-server class or create the object until needed.
+ """
+
+ def __init__(self, klass):
+ """Create a constructor. It is assumed that the class
+ should be imported as 'import klass from klass'.
+
+ klass name of its class
+ """
+ self.klass = klass
+ self.obj = None
+
+ def getobj(self):
+ """Get the sub-server object, importing its class and instantiating it if
+ necessary.
+ """
+ if not self.obj:
+ exec 'from xen.xend.server.%s import %s' % (self.klass, self.klass)
+ klassobj = eval(self.klass)
+ self.obj = klassobj()
+ return self.obj
+
+class SrvDir(SrvBase):
+ """Base class for directory servlets.
+ """
+ isLeaf = False
+
+ def __init__(self):
+ SrvBase.__init__(self)
+ self.table = {}
+ self.order = []
+
+ def __repr__(self):
+ return "<SrvDir %x %s>" %(id(self), self.table.keys())
+
+ def noChild(self, msg):
+ return resource.ErrorPage(http.NOT_FOUND, msg=msg)
+
+ def getChild(self, x, req):
+ if x == '': return self
+ try:
+ val = self.get(x)
+ except XendError, ex:
+ return self.noChild(str(ex))
+ if val is None:
+ return self.noChild('Not found: ' + str(x))
+ else:
+ return val
+
+ def get(self, x):
+ val = self.table.get(x)
+ if isinstance(val, SrvConstructor):
+ val = val.getobj()
+ return val
+
+ def add(self, x, v=None):
+ if v is None:
+ v = 'SrvDir'
+ if isinstance(v, types.StringType):
+ v = SrvConstructor(v)
+ self.table[x] = v
+ self.order.append(x)
+ return v
+
+ def render_GET(self, req):
+ try:
+ if self.use_sxp(req):
+ req.setHeader("Content-type", sxp.mime_type)
+ self.ls(req, 1)
+ else:
+ req.write('<html><head></head><body>')
+ self.print_path(req)
+ self.ls(req)
+ self.form(req)
+ req.write('</body></html>')
+ return ''
+ except Exception, ex:
+ self._perform_err(ex, "GET", req)
+
+ def ls(self, req, use_sxp=0):
+ url = req.prePathURL()
+ if not url.endswith('/'):
+ url += '/'
+ if use_sxp:
+ req.write('(ls ')
+ for k in self.order:
+ req.write(' ' + k)
+ req.write(')')
+ else:
+ req.write('<ul>')
+ for k in self.order:
+ v = self.get(k)
+ req.write('<li><a href="%s%s">%s</a></li>'
+ % (url, k, k))
+ req.write('</ul>')
+
+ def form(self, req):
+ pass
--- /dev/null
+import sys
+import threading
+import select
+import socket
+
+from errno import EAGAIN, EINTR, EWOULDBLOCK
+
+"""General classes to support server and client sockets, without
+specifying what kind of socket they are. There are subclasses
+for TCP and unix-domain sockets (see tcp.py and unix.py).
+"""
+
+"""We make sockets non-blocking so that operations like accept()
+don't block. We also select on a timeout. Otherwise we have no way
+of getting the threads to shutdown.
+"""
+SELECT_TIMEOUT = 2.0
+
+class SocketServerConnection:
+ """An accepted connection to a server.
+ """
+
+ def __init__(self, sock, protocol, addr, server):
+ self.sock = sock
+ self.protocol = protocol
+ self.addr = addr
+ self.server = server
+ self.buffer_n = 1024
+ self.thread = None
+ self.connected = True
+ protocol.setTransport(self)
+ protocol.connectionMade(addr)
+
+ def run(self):
+ self.thread = threading.Thread(target=self.main)
+ #self.thread.setDaemon(True)
+ self.thread.start()
+
+ def main(self):
+ while True:
+ if not self.thread: break
+ if self.select(): break
+ if not self.thread: break
+ data = self.read()
+ if data is None: continue
+ if data is True: break
+ if self.dataReceived(data): break
+
+ def select(self):
+ try:
+ select.select([self.sock], [], [], SELECT_TIMEOUT)
+ return False
+ except socket.error, ex:
+ if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
+ return False
+ else:
+ self.loseConnection(ex)
+ return True
+
+ def read(self):
+ try:
+ data = self.sock.recv(self.buffer_n)
+ if data == '':
+ self.loseConnection()
+ return True
+ return data
+ except socket.error, ex:
+ if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
+ return None
+ else:
+ self.loseConnection(ex)
+ return True
+
+ def dataReceived(self, data):
+ if not self.protocol:
+ return True
+ try:
+ self.protocol.dataReceived(data)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.disconnect(ex)
+ return True
+ return False
+
+ def write(self, data):
+ self.sock.send(data)
+
+ def loseConnection(self, reason=None):
+ self.thread = None
+ self.closeSocket(reason)
+ self.closeProtocol(reason)
+
+ def closeSocket(self, reason):
+ try:
+ self.sock.close()
+ except SystemExit:
+ raise
+ except:
+ pass
+
+ def closeProtocol(self, reason):
+ try:
+ if self.connected:
+ self.connected = False
+ if self.protocol:
+ self.protocol.connectionLost(reason)
+ except SystemExit:
+ raise
+ except:
+ pass
+
+ def getHost(self):
+ return self.sock.getsockname()
+
+ def getPeer(self):
+ return self.addr
+
+class SocketListener:
+ """A server socket, running listen in a thread.
+ Accepts connections and runs a thread for each one.
+ """
+
+ def __init__(self, factory, backlog=None):
+ if backlog is None:
+ backlog = 5
+ self.factory = factory
+ self.sock = None
+ self.backlog = backlog
+ self.thread = None
+
+ def createSocket(self):
+ raise NotImplementedError()
+
+ def acceptConnection(self, sock, protocol, addr):
+ return SocketServerConnection(sock, protocol, addr, self)
+
+ def startListening(self):
+ if self.sock or self.thread:
+ raise IOError("already listening")
+ self.sock = self.createSocket()
+ self.sock.setblocking(0)
+ self.sock.listen(self.backlog)
+ self.run()
+
+ def stopListening(self, reason=None):
+ self.loseConnection(reason)
+
+ def run(self):
+ self.factory.doStart()
+ self.thread = threading.Thread(target=self.main)
+ #self.thread.setDaemon(True)
+ self.thread.start()
+
+ def main(self):
+ while True:
+ if not self.thread: break
+ if self.select(): break
+ if self.accept(): break
+
+ def select(self):
+ try:
+ select.select([self.sock], [], [], SELECT_TIMEOUT)
+ return False
+ except socket.error, ex:
+ if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
+ return False
+ else:
+ self.loseConnection(ex)
+ return True
+
+ def accept(self):
+ try:
+ (sock, addr) = self.sock.accept()
+ sock.setblocking(0)
+ return self.accepted(sock, addr)
+ except socket.error, ex:
+ if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
+ return False
+ else:
+ self.loseConnection(ex)
+ return True
+
+ def accepted(self, sock, addr):
+ protocol = self.factory.buildProtocol(addr)
+ if protocol is None:
+ self.loseConnection()
+ return True
+ connection = self.acceptConnection(sock, protocol, addr)
+ connection.run()
+ return False
+
+ def loseConnection(self, reason=None):
+ self.thread = None
+ self.closeSocket(reason)
+ self.closeFactory(reason)
+
+ def closeSocket(self, reason):
+ try:
+ self.sock.close()
+ except SystemExit:
+ raise
+ except Exception, ex:
+ pass
+
+ def closeFactory(self, reason):
+ try:
+ self.factory.doStop()
+ except SystemExit:
+ raise
+ except:
+ pass
+
+class SocketClientConnection:
+ """A connection to a server from a client.
+
+ Call connectionMade() on the protocol in a thread when connected.
+ It is completely up to the protocol what to do.
+ """
+
+ def __init__(self, connector):
+ self.addr = None
+ self.connector = connector
+ self.buffer_n = 1024
+ self.connected = False
+
+ def createSocket (self):
+ raise NotImplementedError()
+
+ def write(self, data):
+ if self.sock:
+ return self.sock.send(data)
+ else:
+ return 0
+
+ def connect(self, timeout):
+ #todo: run a timer to cancel on timeout?
+ try:
+ sock = self.createSocket()
+ sock.connect(self.addr)
+ self.sock = sock
+ self.connected = True
+ self.protocol = self.connector.buildProtocol(self.addr)
+ self.protocol.setTransport(self)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.connector.connectionFailed(ex)
+ return False
+
+ self.thread = threading.Thread(target=self.main)
+ #self.thread.setDaemon(True)
+ self.thread.start()
+ return True
+
+ def main(self):
+ try:
+ # Call the protocol in a thread.
+ # Up to it what to do.
+ self.protocol.connectionMade(self.addr)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.disconnect(ex)
+
+ def mainLoop(self):
+ # Something a protocol could call.
+ while True:
+ if not self.thread: break
+ if self.select(): break
+ if not self.thread: break
+ data = self.read()
+ if data is None: continue
+ if data is True: break
+ if self.dataReceived(data): break
+
+ def select(self):
+ try:
+ select.select([self.sock], [], [], SELECT_TIMEOUT)
+ return False
+ except socket.error, ex:
+ if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
+ return False
+ else:
+ self.disconnect(ex)
+ return True
+
+ def read(self):
+ try:
+ data = self.sock.recv(self.buffer_n)
+ return data
+ except socket.error, ex:
+ if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
+ return None
+ else:
+ self.disconnect(ex)
+ return True
+
+ def dataReceived(self, data):
+ if not self.protocol:
+ return True
+ try:
+ self.protocol.dataReceived(data)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ self.disconnect(ex)
+ return True
+ return False
+
+ def disconnect(self, reason=None):
+ self.thread = None
+ self.closeSocket(reason)
+ self.closeProtocol(reason)
+ self.closeConnector(reason)
+
+ def closeSocket(self, reason):
+ try:
+ if self.sock:
+ self.sock.close()
+ except SystemExit:
+ raise
+ except:
+ pass
+
+ def closeProtocol(self, reason):
+ try:
+ if self.connected:
+ self.connected = False
+ if self.protocol:
+ self.protocol.connectionLost(reason)
+ except SystemExit:
+ raise
+ except:
+ pass
+ self.protocol = None
+
+ def closeConnector(self, reason):
+ try:
+ self.connector.connectionLost(reason)
+ except SystemExit:
+ raise
+ except:
+ pass
+
+class SocketConnector:
+ """A client socket. Connects to a server and runs the client protocol
+ in a thread.
+ """
+
+ def __init__(self, factory):
+ self.factoryStarted = False
+ self.factory = factory
+ self.state = "disconnected"
+ self.transport = None
+
+ def getDestination(self):
+ raise NotImplementedError()
+
+ def connectTransport(self):
+ raise NotImplementedError()
+
+ def connect(self):
+ if self.state != "disconnected":
+ raise socket.error(EINVAL, "cannot connect in state " + self.state)
+ self.state = "connecting"
+ if not self.factoryStarted:
+ self.factoryStarted = True
+ self.factory.doStart()
+ self.factory.startedConnecting()
+ self.connectTransport()
+
+ def stopConnecting(self):
+ if self.state != "connecting":
+ return
+ self.state = "disconnected"
+ self.transport.disconnect()
+
+ def buildProtocol(self, addr):
+ return self.factory.buildProtocol(addr)
+
+ def connectionLost(self, reason=None):
+ self.factory.doStop()
+
+ def connectionFailed(self, reason=None):
+ self.factory.doStop()
+
--- /dev/null
+
+class Deferred:
+ pass
--- /dev/null
+#============================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+#============================================================================
+# Parts of this library are derived from Twisted:
+# Copyright (C) 2001 Matthew W. Lefkowitz
+#
+# Copyright (C) 2005 Mike Wray <mike.wray@hp.com>
+#============================================================================
+
+from mimetools import Message
+from cStringIO import StringIO
+import math
+import time
+import cgi
+
+CONTINUE = 100
+SWITCHING_PROTOCOLS = 101
+
+OK = 200
+CREATED = 201
+ACCEPTED = 202
+NON_AUTHORITATIVE_INFORMATION = 203
+NO_CONTENT = 204
+RESET_CONTENT = 205
+PARTIAL_CONTENT = 206
+MULTI_STATUS = 207
+
+MULTIPLE_CHOICE = 300
+MOVED_PERMANENTLY = 301
+FOUND = 302
+SEE_OTHER = 303
+NOT_MODIFIED = 304
+USE_PROXY = 305
+TEMPORARY_REDIRECT = 307
+
+BAD_REQUEST = 400
+UNAUTHORIZED = 401
+PAYMENT_REQUIRED = 402
+FORBIDDEN = 403
+NOT_FOUND = 404
+NOT_ALLOWED = 405
+NOT_ACCEPTABLE = 406
+PROXY_AUTH_REQUIRED = 407
+REQUEST_TIMEOUT = 408
+CONFLICT = 409
+GONE = 410
+LENGTH_REQUIRED = 411
+PRECONDITION_FAILED = 412
+REQUEST_ENTITY_TOO_LARGE = 413
+REQUEST_URI_TOO_LONG = 414
+UNSUPPORTED_MEDIA_TYPE = 415
+REQUESTED_RANGE_NOT_SATISFIABLE = 416
+EXPECTATION_FAILED = 417
+
+INTERNAL_SERVER_ERROR = 500
+NOT_IMPLEMENTED = 501
+BAD_GATEWAY = 502
+SERVICE_UNAVAILABLE = 503
+GATEWAY_TIMEOUT = 504
+VERSION_NOT_SUPPORTED = 505
+INSUFFICIENT_STORAGE_SPACE = 507
+NOT_EXTENDED = 510
+
+NO_BODY_CODES = [ NO_CONTENT, NOT_MODIFIED ]
+
+
+STATUS = {
+ CONTINUE : "Continue",
+ SWITCHING_PROTOCOLS : "Switching protocols",
+
+ OK : "OK",
+ CREATED : "Created",
+ ACCEPTED : "Accepted",
+ NON_AUTHORITATIVE_INFORMATION : "Non-authoritative information",
+ NO_CONTENT : "No content",
+ RESET_CONTENT : "Reset content",
+ PARTIAL_CONTENT : "Partial content",
+ MULTI_STATUS : "Multi-status",
+
+ MULTIPLE_CHOICE : "Multiple choice",
+ MOVED_PERMANENTLY : "Moved permanently",
+ FOUND : "Found",
+ SEE_OTHER : "See other",
+ NOT_MODIFIED : "Not modified",
+ USE_PROXY : "Use proxy",
+ TEMPORARY_REDIRECT : "Temporary redirect",
+
+ BAD_REQUEST : "Bad request",
+ UNAUTHORIZED : "Unauthorized",
+ PAYMENT_REQUIRED : "Payment required",
+ FORBIDDEN : "Forbidden",
+ NOT_FOUND : "Not found",
+ NOT_ALLOWED : "Not allowed",
+ NOT_ACCEPTABLE : "Not acceptable",
+ PROXY_AUTH_REQUIRED : "Proxy authentication required",
+ REQUEST_TIMEOUT : "Request timeout",
+ CONFLICT : "Conflict",
+ GONE : "Gone",
+ LENGTH_REQUIRED : "Length required",
+ PRECONDITION_FAILED : "Precondition failed",
+ REQUEST_ENTITY_TOO_LARGE : "Request entity too large",
+ REQUEST_URI_TOO_LONG : "Request URI too long",
+ UNSUPPORTED_MEDIA_TYPE : "Unsupported media type",
+ REQUESTED_RANGE_NOT_SATISFIABLE : "Requested range not satisfiable",
+ EXPECTATION_FAILED : "Expectation failed",
+
+ INTERNAL_SERVER_ERROR : "Internal server error",
+ NOT_IMPLEMENTED : "Not implemented",
+ BAD_GATEWAY : "Bad gateway",
+ SERVICE_UNAVAILABLE : "Service unavailable",
+ GATEWAY_TIMEOUT : "Gateway timeout",
+ VERSION_NOT_SUPPORTED : "HTTP version not supported",
+ INSUFFICIENT_STORAGE_SPACE : "Insufficient storage space",
+ NOT_EXTENDED : "Not extended",
+ }
+
+def getStatus(code):
+ return STATUS.get(code, "unknown")
+
+MULTIPART_FORM_DATA = 'multipart/form-data'
+URLENCODED = 'application/x-www-form-urlencoded'
+
+parseQueryArgs = cgi.parse_qs
+
+def timegm(year, month, day, hour, minute, second):
+ """Convert time tuple in GMT to seconds since epoch, GMT"""
+ EPOCH = 1970
+ assert year >= EPOCH
+ assert 1 <= month <= 12
+ days = 365*(year-EPOCH) + calendar.leapdays(EPOCH, year)
+ for i in range(1, month):
+ days = days + calendar.mdays[i]
+ if month > 2 and calendar.isleap(year):
+ days = days + 1
+ days = days + day - 1
+ hours = days*24 + hour
+ minutes = hours*60 + minute
+ seconds = minutes*60 + second
+ return seconds
+
+def stringToDatetime(dateString):
+ """Convert an HTTP date string to seconds since epoch."""
+ parts = dateString.split(' ')
+ day = int(parts[1])
+ month = int(monthname.index(parts[2]))
+ year = int(parts[3])
+ hour, min, sec = map(int, parts[4].split(':'))
+ return int(timegm(year, month, day, hour, min, sec))
+
+class HttpRequest:
+
+ http_version = (1, 1)
+
+ http_version_string = ("HTTP/%d.%d" % http_version)
+
+ max_content_length = 10000
+ max_headers = 500
+
+ request_line = None
+ request_method = None
+ request_uri = None
+ request_path = None
+ request_query = None
+ request_version = None
+ content_length = 0
+ content = None
+ etag = None
+ close_connection = True
+ response_code = 200
+ response_status = "OK"
+ response_sent = False
+ cached = False
+ last_modified = None
+
+ forceSSL = False
+
+ def __init__(self, host, rin, out):
+ self.host = host
+ self.rin = rin
+ self.out = out
+ self.request_args = {}
+ self.args = self.request_args
+ self.request_headers = {}
+ self.request_cookies = {}
+ self.response_headers = {}
+ self.response_cookies = {}
+ self.output = StringIO()
+ self.parseRequest()
+
+ def isSecure(self):
+ return self.forceSSL
+
+ def getRequestMethod(self):
+ return self.request_method
+
+ def trim(self, str, ends):
+ for end in ends:
+ if str.endswith(end):
+ str = str[ : -len(end) ]
+ break
+ return str
+
+ def requestError(self, code, msg=None):
+ self.sendError(code, msg)
+ raise ValueError(self.response_status)
+
+ def sendError(self, code, msg=None):
+ self.setResponseCode(code, msg=msg)
+ self.sendResponse()
+
+ def parseRequestVersion(self, version):
+ try:
+ if not version.startswith('HTTP/'):
+ raise ValueError
+ version_string = version.split('/', 1)[1]
+ version_codes = version_string.split('.')
+ if len(version_codes) != 2:
+ raise ValueError
+ request_version = (int(version_codes[0]), int(version_codes[1]))
+ except (ValueError, IndexError):
+ self.requestError(400, "Bad request version (%s)" % `version`)
+
+ def parseRequestLine(self):
+ line = self.trim(self.request_line, ['\r\n', '\n'])
+ line_fields = line.split()
+ n = len(line_fields)
+ if n == 3:
+ [method, uri, version] = line_fields
+ elif n == 2:
+ [method, uri] = line_fields
+ version = 'HTTP/0.9'
+ else:
+ self.requestError(BAD_REQUEST,
+ "Bad request (%s)" % `line`)
+
+ request_version = self.parseRequestVersion(version)
+
+ if request_version > (2, 0):
+ self.requestError(VERSION_NOT_SUPPORTED,
+ "HTTP version not supported (%s)" % `version`)
+ #if request_version >= (1, 1) and self.http_version >= (1, 1):
+ # self.close_connection = False
+ #else:
+ # self.close_connection = True
+
+ self.request_method = method
+ self.method = method
+ self.request_uri = uri
+ self.request_version = version
+
+ uri_query = uri.split('?')
+ if len(uri_query) == 1:
+ self.request_path = uri
+ else:
+ self.request_path = uri_query[0]
+ self.request_query = uri_query[1]
+ self.request_args = parseQueryArgs(self.request_query)
+ self.args = self.request_args
+
+
+ def parseRequestHeaders(self):
+ header_bytes = ""
+ header_count = 0
+ while True:
+ if header_count >= self.max_headers:
+ self.requestError(BAD_REQUEST,
+ "Bad request (too many headers)")
+ line = self.rin.readline()
+ header_bytes += line
+ header_count += 1
+ if line == '\r\n' or line == '\n' or line == '':
+ break
+ #print 'parseRequestHeaders>', header_bytes
+ header_input = StringIO(header_bytes)
+ self.request_headers = Message(header_input)
+
+ def parseRequestCookies(self):
+ cookie_hdr = self.getHeader("cookie")
+ if not cookie_hdr: return
+ for cookie in cookie_hdr.split(';'):
+ try:
+ cookie = cookie.lstrip()
+ (k, v) = cookie.split('=', 1)
+ self.request_cookies[k] = v
+ except ValueError:
+ pass
+
+ def parseRequestArgs(self):
+ if ((self.content is None) or
+ (self.request_method != "POST")):
+ return
+ content_type = self.getHeader('content-type')
+ if not content_type:
+ return
+ (encoding, params) = cgi.parse_header(content_type)
+ if encoding == URLENCODED:
+ xargs = cgi.parse_qs(self.content.getvalue(),
+ keep_blank_values=True)
+ elif encoding == MULTIPART_FORM_DATA:
+ xargs = cgi.parse_multipart(self.content, params)
+ else:
+ xargs = {}
+ self.request_args.update(xargs)
+
+ def getCookie(self, k):
+ return self.request_cookies[k]
+
+ def readContent(self):
+ try:
+ self.content_length = int(self.getHeader("Content-Length"))
+ except:
+ return
+ if self.content_length > self.max_content_length:
+ self.requestError(REQUEST_ENTITY_TOO_LARGE)
+ self.content = self.rin.read(self.content_length)
+ self.content = StringIO(self.content)
+ self.content.seek(0,0)
+
+ def parseRequest(self):
+ #print 'parseRequest>'
+ self.request_line = self.rin.readline()
+ self.parseRequestLine()
+ self.parseRequestHeaders()
+ self.parseRequestCookies()
+ connection_mode = self.getHeader('Connection')
+ self.setCloseConnection(connection_mode)
+ self.readContent()
+ self.parseRequestArgs()
+ #print 'parseRequest<'
+
+ def setCloseConnection(self, mode):
+ if not mode: return
+ mode = mode.lower()
+ if mode == 'close':
+ self.close_connection = True
+ elif (mode == 'keep-alive') and (self.http_version >= (1, 1)):
+ self.close_connection = False
+ #print 'setCloseConnection>', mode, self.close_connection
+
+ def getHeader(self, k, v=None):
+ return self.request_headers.get(k, v)
+
+ def getRequestMethod(self):
+ return self.request_method
+
+ def getRequestPath(self):
+ return self.request_path
+
+ def setResponseCode(self, code, status=None, msg=None):
+ self.response_code = code
+ if not status:
+ status = getStatus(code)
+ self.response_status = status
+
+ def setResponseHeader(self, k, v):
+ #print 'setResponseHeader>', k, v
+ k = k.lower()
+ self.response_headers[k] = v
+ if k == 'connection':
+ self.setCloseConnection(v)
+
+ setHeader = setResponseHeader
+
+ def setLastModified(self, when):
+ # time.time() may be a float, but the HTTP-date strings are
+ # only good for whole seconds.
+ when = long(math.ceil(when))
+ if (not self.last_modified) or (self.last_modified < when):
+ self.lastModified = when
+
+ modified_since = self.getHeader('if-modified-since')
+ if modified_since:
+ modified_since = stringToDatetime(modified_since)
+ if modified_since >= when:
+ self.setResponseCode(NOT_MODIFIED)
+ self.cached = True
+
+ def setContentType(self, ty):
+ self.setResponseHeader("Content-Type", ty)
+
+ def setEtag(self, etag):
+ if etag:
+ self.etag = etag
+
+ tags = self.getHeader("if-none-match")
+ if tags:
+ tags = tags.split()
+ if (etag in tags) or ('*' in tags):
+ if self.request_method in ("HEAD", "GET"):
+ code = NOT_MODIFIED
+ else:
+ code = PRECONDITION_FAILED
+ self.setResponseCode(code)
+ self.cached = True
+
+ def addCookie(self, k, v, expires=None, domain=None, path=None,
+ max_age=None, comment=None, secure=None):
+ cookie = v
+ if expires != None:
+ cookie += "; Expires=%s" % expires
+ if domain != None:
+ cookie += "; Domain=%s" % domain
+ if path != None:
+ cookie += "; Path=%s" % path
+ if max_age != None:
+ cookie += "; Max-Age=%s" % max_age
+ if comment != None:
+ cookie += "; Comment=%s" % comment
+ if secure:
+ cookie += "; Secure"
+ self.response_cookies[k] = cookie
+
+ def sendResponseHeaders(self):
+ if self.etag:
+ self.setResponseHeader("ETag", self.etag)
+ for (k, v) in self.response_headers.items():
+ self.send("%s: %s\r\n" % (k.capitalize(), v))
+ for (k, v) in self.response_cookies.items():
+ self.send("Set-Cookie: %s=%s\r\n" % (k, v))
+ self.send("\r\n")
+
+ def sendResponse(self):
+ #print 'sendResponse>'
+ if self.response_sent:
+ return
+ self.response_sent = True
+ send_body = self.hasBody()
+ if not self.close_connection:
+ self.setResponseHeader("Connection", "keep-alive")
+ if send_body:
+ self.output.seek(0, 0)
+ body = self.output.getvalue()
+ body_length = len(body)
+ #print 'sendResponse> body=', body_length, body
+ self.setResponseHeader("Content-Length", body_length)
+ if self.http_version > (0, 9):
+ self.send("%s %d %s\r\n" % (self.http_version_string,
+ self.response_code,
+ self.response_status))
+ self.sendResponseHeaders()
+ if send_body:
+ #print 'sendResponse> writing body'
+ self.send(body)
+
+ def write(self, data):
+ #print 'write>', data
+ self.output.write(data)
+
+ def send(self, data):
+ #print 'send>', len(data), '|%s|' % data
+ self.out.write(data)
+
+ def hasNoBody(self):
+ return ((self.request_method == "HEAD") or
+ (self.response_code in NO_BODY_CODES) or
+ self.cached)
+
+ def hasBody(self):
+ return not self.hasNoBody()
+
+ def process(self):
+ pass
+ return self.close_connection
+
+ def getRequestHostname(self):
+ """Get the hostname that the user passed in to the request.
+
+ Uses the 'Host:' header if it is available, and the
+ host we are listening on otherwise.
+ """
+ return (self.getHeader('host') or
+ socket.gethostbyaddr(self.getHostAddr())[0]
+ ).split(':')[0]
+
+ def getHost(self):
+ return self.host
+
+ def getHostAddr(self):
+ return self.host[0]
+
+ def getPort(self):
+ return self.host[1]
+
+ def setHost(self, host, port, ssl=0):
+ """Change the host and port the request thinks it's using.
+
+ This method is useful for working with reverse HTTP proxies (e.g.
+ both Squid and Apache's mod_proxy can do this), when the address
+ the HTTP client is using is different than the one we're listening on.
+
+ For example, Apache may be listening on https://www.example.com, and then
+ forwarding requests to http://localhost:8080, but we don't want HTML produced
+ to say 'http://localhost:8080', they should say 'https://www.example.com',
+ so we do::
+
+ request.setHost('www.example.com', 443, ssl=1)
+
+ """
+ self.forceSSL = ssl
+ self.received_headers["host"] = host
+ self.host = (host, port)
+
+
+
--- /dev/null
+import string
+import socket
+from urllib import quote, unquote
+
+import http
+from SrvDir import SrvDir
+
+class HttpServerRequest(http.HttpRequest):
+
+ def __init__(self, server, addr, srd, srw):
+ #print 'HttpServerRequest>', addr
+ self.server = server
+ self.prepath = ''
+ http.HttpRequest.__init__(self, addr, srd, srw)
+
+ def process(self):
+ #print 'HttpServerRequest>process', 'path=', self.request_path
+ self.prepath = []
+ self.postpath = map(unquote, string.split(self.request_path[1:], '/'))
+ res = self.getResource()
+ self.render(res)
+ self.sendResponse()
+ return self.close_connection
+
+ def prePathURL(self):
+ url_host = self.getRequestHostname()
+ port = self.getPort()
+ if self.isSecure():
+ url_proto = "https"
+ default_port = 443
+ else:
+ url_proto = "http"
+ default_port = 80
+ if port != default_port:
+ url_host += (':%d' % port)
+ url_path = quote(string.join(self.prepath, '/'))
+ return ('%s://%s/%s' % (url_proto, url_host, url_path))
+
+ def getResource(self):
+ return self.server.getResource(self)
+
+ def render(self, res):
+ #print 'HttpServerRequest>render', res
+ if res is None:
+ self.sendError(http.NOT_FOUND)
+ else:
+ res.render(self)
+
+class HttpServer:
+
+ request_queue_size = 5
+
+ def __init__(self, interface='', port=8080, root=None):
+ if root is None:
+ root = SrvDir()
+ self.interface = interface
+ self.port = port
+ self.closed = False
+ self.root = root
+
+ def getRoot(self):
+ return self.root
+
+ def getPort(self):
+ return self.port
+
+ def run(self):
+ self.bind()
+ self.listen()
+ self.requestLoop()
+
+ def stop(self):
+ self.close()
+
+ def bind(self):
+ #print 'bind>', self.interface, self.port
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind((self.interface, self.port))
+
+ def listen(self):
+ self.socket.listen(self.request_queue_size)
+
+ def accept(self):
+ return self.socket.accept()
+
+ def requestLoop(self):
+ while not self.closed:
+ self.acceptRequest()
+
+ def close(self):
+ self.closed = True
+ try:
+ self.socket.close()
+ except:
+ pass
+
+ def acceptRequest(self):
+ #print 'acceptRequest>'
+ try:
+ (sock, addr) = self.accept()
+ #print 'acceptRequest>', sock, addr
+ self.processRequest(sock, addr)
+ except socket.error:
+ return
+
+ def processRequest(self, sock, addr):
+ #print 'processRequest>', sock, addr
+ srd = sock.makefile('rb')
+ srw = sock.makefile('wb')
+ srvaddr = (socket.gethostname(), self.port)
+ while True:
+ #print 'HttpServerRequest...'
+ req = HttpServerRequest(self, srvaddr, srd, srw)
+ close = req.process()
+ srw.flush()
+ #print 'HttpServerRequest close=', close
+ if close:
+ break
+ try:
+ #print 'close...'
+ sock.close()
+ except:
+ pass
+ #print 'processRequest<', sock, addr
+
+ def getResource(self, req):
+ return self.root.getRequestResource(req)
+
+
+def main():
+ root = SrvDir()
+ a = root.add("a", SrvDir())
+ b = root.add("b", SrvDir())
+ server = HttpServer(root=root)
+ server.run()
+
+if __name__ == "__main__":
+ main()
+
+
+
+
+
--- /dev/null
+class Factory:
+
+ def __init__(self):
+ pass
+
+ def startedConnecting(self):
+ print 'ServerProtocolFactory>startedConnecting>'
+ pass
+
+ def doStart(self):
+ print 'ServerProtocolFactory>doStart>'
+ pass
+
+ def doStop(self):
+ print 'ServerProtocolFactory>doStop>'
+ pass
+
+ def buildProtocol(self, addr):
+ print 'ServerProtocolFactory>buildProtocol>', addr
+ return Protocol(self)
+
+class ServerFactory(Factory):
+ pass
+
+class ClientFactory(Factory):
+ pass
+
+class Protocol:
+
+ factory = None
+ transport = None
+ connected = False
+
+ def __init__(self, factory):
+ self.factory = factory
+
+ def setTransport(self, transport):
+ self.transport = transport
+ self.connected = bool(transport)
+
+ def getTransport(self):
+ return self.transport
+
+ def connectionMade(self, addr):
+ print 'Protocol>connectionMade>', addr
+ pass
+
+ def connectionLost(self, reason=None):
+ print 'Protocol>connectionLost>', reason
+ pass
+
+ def dataReceived(self, data):
+ print 'Protocol>dataReceived>'
+ pass
+
+ def write(self, data):
+ if self.transport:
+ return self.transport.write(data)
+ else:
+ return 0
+
+ def read(self):
+ if self.transport:
+ return self.transport.read()
+ else:
+ return None
+
+class TestClientFactory(Factory):
+
+ def buildProtocol(self, addr):
+ print 'TestClientProtocolFactory>buildProtocol>', addr
+ return TestClientProtocol(self)
+
+class TestClientProtocol(Protocol):
+
+ def connectionMade(self, addr):
+ print 'TestProtocol>connectionMade>', addr
+ self.write("hello")
+ self.write("there")
+
+class TestServerFactory(Factory):
+
+ def buildProtocol(self, addr):
+ print 'TestServerProtocolFactory>buildProtocol>', addr
+ return TestServerProtocol(self)
+
+class TestServerProtocol(Protocol):
+
+ def dataReceived(self, data):
+ print 'TestServerProtocol>dataReceived>', len(data), data
+ #sys.exit(0)
+ import os
+ os._exit(0)
+
--- /dev/null
+from threading import Timer
+
+from unix import listenUNIX, connectUNIX
+from tcp import listenTCP, connectTCP
+
+def callLater(_delay, _fn, *args, **kwds):
+ timer = Timer(_delay, _fn, args=args, kwargs=kwds)
+ timer.start()
+ return timer
--- /dev/null
+import http
+
+def findResource(resource, request):
+ """Traverse resource tree to find who will handle the request."""
+ while request.postpath and not resource.isLeaf:
+ #print 'findResource:', resource, request.postpath
+ pathElement = request.postpath.pop(0)
+ request.prepath.append(pathElement)
+ next = resource.getPathResource(pathElement, request)
+ if not next: break
+ resource = next
+ return resource
+
+class Resource:
+
+ isLeaf = False
+
+ def __init__(self):
+ self.children = {}
+
+ def getRequestResource(self, req):
+ return findResource(self, req)
+
+ def getChild(self, path, request):
+ return None
+
+ def getPathResource(self, path, request):
+ #print 'getPathResource>', self, path
+ if self.children.has_key(path):
+ val = self.children[path]
+ else:
+ val = self.getChild(path, request)
+ #print 'getPathResource<', val
+ return val
+
+ def putChild(self, path, child):
+ self.children[path] = child
+ #child.server = self.server
+
+ def render(self, req):
+ meth = getattr(self, 'render_' + req.getRequestMethod(), self.unsupported)
+ return meth(req)
+
+ def supportedMethods(self):
+ l = []
+ s = 'render_'
+ for x in dir(self):
+ if x.startswith(s):
+ l.append(x[len(s):])
+ return l
+
+ def render_HEAD(self, req):
+ return self.render_GET(req)
+
+ def render_GET(self, req):
+ req.setContentType("text/plain")
+ req.write("GET")
+
+ def render_POST(self, req):
+ req.setContentType("text/plain")
+ req.write("POST")
+
+ def unsupported(self, req):
+ req.setHeader("Accept", ",".join(self.supportedMethods()))
+ req.setResponseCode(http.NOT_IMPLEMENTED)
+ req.setContentType("text/plain")
+ req.write("Request method not supported (%s)" % req.getRequestMethod())
+
+class ErrorPage(Resource):
+
+ isLeaf = True
+
+ def __init__(self, code, status=None, msg=None):
+ Resource.__init__(self)
+ if status is None:
+ status = http.getStatus(code)
+ if msg is None:
+ msg = status
+ self.code = code
+ self.status = status
+ self.msg = msg
+
+ def render(self, req):
+ req.setResponseCode(self.code, self.status)
+ req.setContentType("text/plain")
+ req.write(self.msg)
+
+
+
+
+
--- /dev/null
+import os
+
+from resource import Resource
+
+class File(Resource):
+
+ isLeaf = True
+
+ def __init__(self, filename, defaultType=None):
+ if defaultType is None:
+ defaultType = "text/plain"
+ self.filename = filename
+ self.type = defaultType
+ self.encoding = None
+
+ def getFileSize(self):
+ try:
+ info = os.stat(self.filename)
+ return info.st_size
+ except:
+ return 0
+
+ def render(self, req):
+ if self.type:
+ req.setHeader('Content-Type', self.type)
+ if self.encoding:
+ rew.setHeader('Content-Encoding', self.encoding)
+ req.setHeader('Content-Length', self.getFileSize())
+ try:
+ io = file(self.filename, "r")
+ while True:
+ buf = io.read(1024)
+ if not buf:
+ break
+ req.write(buf)
+ except IOError:
+ pass
+ try:
+ if io:
+ io.close()
+ except:
+ pass
+ return ''
+
+
+
--- /dev/null
+import sys
+import socket
+import types
+
+from connection import *
+from protocol import *
+
+class TCPServerConnection(SocketServerConnection):
+ pass
+
+class TCPListener(SocketListener):
+
+ def __init__(self, port, factory, backlog=None, interface=''):
+ SocketListener.__init__(self, factory, backlog=backlog)
+ self.port = port
+ self.interface = interface
+
+ def createSocket(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ addr = (self.interface, self.port)
+ sock.bind(addr)
+ return sock
+
+ def acceptConnection(self, sock, protocol, addr):
+ return TCPServerConnection(sock, protocol, addr, self)
+
+class TCPClientConnection(SocketClientConnection):
+
+ def __init__(self, host, port, bindAddress, connector):
+ SocketClientConnection.__init__(self, connector)
+ self.addr = (host, port)
+ self.bindAddress = bindAddress
+
+ def createSocket(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if self.bindAddress is not None:
+ sock.bind(self.bindAddress)
+ return sock
+
+class TCPConnector(SocketConnector):
+
+ def __init__(self, host, port, factory, timeout=None, bindAddress=None):
+ SocketConnector.__init__(self, factory)
+ self.host = host
+ self.port = self.servicePort(port)
+ self.bindAddress = bindAddress
+ self.timeout = timeout
+
+ def servicePort(self, port):
+ if isinstance(port, types.StringTypes):
+ try:
+ port = socket.getservbyname(port, 'tcp')
+ except socket.error, ex:
+ raise IOError("unknown service: " + ex)
+ return port
+
+ def getDestination(self):
+ return (self.host, self.port)
+
+ def connectTransport(self):
+ self.transport = TCPClientConnection(
+ self.host, self.port, self.bindAddress, self)
+ self.transport.connect(self.timeout)
+
+def listenTCP(port, factory, interface='', backlog=None):
+ l = TCPListener(port, factory, interface=interface, backlog=backlog)
+ l.startListening()
+ return l
+
+def connectTCP(host, port, factory, timeout=None, bindAddress=None):
+ c = TCPConnector(host, port, factory, timeout=timeout, bindAddress=bindAddress)
+ c.connect()
+ return c
+
+def main(argv):
+ host = 'localhost'
+ port = 8005
+ if argv[1] == "client":
+ c = connectTCP(host, port, TestClientFactory())
+ print 'client:', c
+ else:
+ s = listenTCP(port, TestServerFactory())
+ print 'server:', s
+
+if __name__ == "__main__":
+ main(sys.argv)
+
+
+
--- /dev/null
+import sys
+import socket
+import os
+
+from connection import *
+from protocol import *
+
+class UnixServerConnection(SocketServerConnection):
+ pass
+
+class UnixListener(SocketListener):
+
+ def __init__(self, path, factory, backlog=None):
+ SocketListener.__init__(self, factory, backlog=backlog)
+ self.path = path
+
+ def createSocket(self):
+ try:
+ os.unlink(self.path)
+ except SystemExit:
+ raise
+ except Exception, ex:
+ pass
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(self.path)
+ return sock
+
+ def acceptConnection(self, sock, protocol, addr):
+ return UnixServerConnection(sock, protocol, addr, self)
+
+class UnixClientConnection(SocketClientConnection):
+
+ def __init__(self, addr, connector):
+ SocketClientConnection.__init__(self, connector)
+ self.addr = addr
+
+ def createSocket(self):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ return sock
+
+class UnixConnector(SocketConnector):
+
+ def __init__(self, path, factory, timeout=None):
+ SocketConnector.__init__(self, factory)
+ self.addr = path
+ self.timeout = timeout
+
+ def getDestination(self):
+ return self.addr
+
+ def connectTransport(self):
+ self.transport = UnixClientConnection(self.addr, self)
+ self.transport.connect(self.timeout)
+
+def listenUNIX(path, factory, backlog=None):
+ l = UnixListener(path, factory, backlog=backlog)
+ l.startListening()
+ return l
+
+def connectUNIX(path, factory, timeout=None):
+ c = UnixConnector(path, factory, timeout=timeout)
+ c.connect()
+ return c
+
+def main(argv):
+ path = "/tmp/test-foo"
+ if argv[1] == "client":
+ c = connectUNIX(path, TestClientFactory())
+ print "client:", c
+ else:
+ s = listenUNIX(path, TestServeractory())
+ print "server:", s
+
+if __name__ == "__main__":
+ main(sys.argv)
+
"""
import string
+from threading import Lock
-from twisted.internet import reactor
+#from twisted.internet import reactor
+from xen.web import reactor
# subscribe a.b.c h: map a.b.c -> h
# subscribe a.b.* h: map a.b.* -> h
self.handlers = {}
self.run = run
self.queue = []
+ self.lock = Lock()
def start(self):
"""Enable event handling. Sends any queued events.
"""
- self.run = 1
- for (e,v) in self.queue:
+ try:
+ self.lock.acquire()
+ self.run = 1
+ queue = self.queue
+ self.queue = []
+ finally:
+ self.lock.release()
+ for (e,v) in queue:
self.inject(e, v)
- self.queue = []
def stop(self):
"""Suspend event handling. Events injected while suspended
are queued until we are started again.
"""
- self.run = 0
+ try:
+ self.lock.acquire()
+ self.run = 0
+ finally:
+ self.lock.release()
def subscribe(self, event, handler):
"""Subscribe to an event. For example 'a.b.c.d'.
event event name
handler event handler fn(event, val)
"""
- hl = self.handlers.get(event)
- if hl is None:
- self.handlers[event] = [handler]
- else:
- hl.append(handler)
+ try:
+ self.lock.acquire()
+ hl = self.handlers.get(event)
+ if hl is None:
+ self.handlers[event] = [handler]
+ else:
+ hl.append(handler)
+ finally:
+ self.lock.release()
def unsubscribe_all(self, event=None):
"""Unsubscribe all handlers for a given event, or all handlers.
event event (optional)
"""
- if event == None:
- self.handlers.clear()
- elif event in self.handlers:
- del self.handlers[event]
+ try:
+ self.lock.acquire()
+ if event == None:
+ self.handlers.clear()
+ elif event in self.handlers:
+ del self.handlers[event]
+ finally:
+ self.lock.release()
def unsubscribe(self, event, handler):
"""Unsubscribe a given event and handler.
event event
handler handler
"""
- hl = self.handlers.get(event)
- if hl is None:
- return
- if handler in hl:
- hl.remove(handler)
+ try:
+ self.lock.acquire()
+ hl = self.handlers.get(event)
+ if hl is None:
+ return
+ if handler in hl:
+ hl.remove(handler)
+ finally:
+ self.lock.release()
def inject(self, event, val, async=1):
"""Inject an event. Handlers for it are called if running, otherwise
event event type
val event value
"""
- if self.run:
- if async:
- reactor.callLater(0, self.call_handlers, event, val)
- else:
- self.notify_handlers(event, val)
+ try:
+ self.lock.acquire()
+ if not self.run:
+ self.queue.append( (event, val) )
+ return
+ finally:
+ self.lock.release()
+
+ if async:
+ reactor.callLater(0, self.call_handlers, event, val)
else:
- self.queue.append( (event, val) )
+ self.notify_handlers(event, val)
def call_handlers(self, event, val):
"""Internal method to call event handlers.
event event type
val event value
"""
- hl = self.handlers.get(key)
- if hl is None:
- return
- # Copy the handler list so that handlers can call
- # subscribe/unsubscribe safely - python list iteration
- # is not safe against list modification.
- for h in hl[:]:
+ try:
+ self.lock.acquire()
+ hl = self.handlers.get(key)
+ if hl is None:
+ return
+ # Copy the handler list so that handlers can call
+ # subscribe/unsubscribe safely - python list iteration
+ # is not safe against list modification.
+ hl = hl[:]
+ finally:
+ self.lock.release()
+ # Must not hold the lock while calling the handlers.
+ for h in hl:
try:
h(event, val)
except:
if domid in doms:
try:
self._new_domain(config, doms[domid])
+ self.update_domain(domid)
except Exception, ex:
log.exception("Error recreating domain info: id=%s", domid)
self._delete_domain(domid)
destroyed = 0
for d in casualties:
id = str(d['dom'])
- print 'reap>', id
+ #print 'reap>', id
dominfo = self.domain_by_id.get(id)
name = (dominfo and dominfo.name) or '??'
if dominfo and dominfo.is_terminated():
- print 'reap> already terminated:', id
+ #print 'reap> already terminated:', id
continue
log.debug('XendDomain>reap> domain died name=%s id=%s', name, id)
if d['shutdown']:
@param devconfig: device configuration
"""
dominfo = self.domain_lookup(id)
- self.refresh_schedule()
val = dominfo.device_create(devconfig)
self.update_domain(dominfo.id)
+ self.refresh_schedule()
return val
def domain_device_configure(self, id, devconfig, idx):
@return: updated device configuration
"""
dominfo = self.domain_lookup(id)
- self.refresh_schedule()
val = dominfo.device_configure(devconfig, idx)
self.update_domain(dominfo.id)
+ self.refresh_schedule()
return val
def domain_device_refresh(self, id, type, idx):
@param type: device type
"""
dominfo = self.domain_lookup(id)
- self.refresh_schedule()
val = dominfo.device_refresh(type, idx)
self.update_domain(dominfo.id)
+ self.refresh_schedule()
return val
def domain_device_destroy(self, id, type, idx):
@param type: device type
"""
dominfo = self.domain_lookup(id)
- self.refresh_schedule()
val = dominfo.device_destroy(type, idx)
self.update_domain(dominfo.id)
+ self.refresh_schedule()
return val
def domain_devtype_ls(self, id, type):
@return: device indexes
"""
dominfo = self.domain_lookup(id)
- return dominfo.get_devices(type)
+ return dominfo.getDeviceIndexes(type)
def domain_devtype_get(self, id, type, idx):
"""Get a device from a domain.
@return: device object (or None)
"""
dominfo = self.domain_lookup(id)
- return dominfo.get_device_by_index(type, idx)
+ return dominfo.getDeviceByIndex(type, idx)
def domain_vif_credit_limit(self, id, vif, credit, period):
"""Limit the vif's transmission rate
"""
dominfo = self.domain_lookup(id)
- try:
- return dominfo.limit_vif(vif, credit, period)
- except Exception, ex:
- raise XendError(str(ex))
+ dev = dominfo.getDeviceById('vif', vif)
+ if not dev:
+ raise XendError("invalid vif")
+ return dev.setCreditLimit(credit, period)
def domain_vif_ls(self, id):
"""Get list of virtual network interface (vif) indexes for a domain.
Includes support for domain construction, using
open-ended configurations.
-Author: Mike Wray <mike.wray@hpl.hp.com>
+Author: Mike Wray <mike.wray@hp.com>
"""
from XendLogging import log
from XendError import VmError
from XendRoot import get_component
-#import XendConsole; xendConsole = XendConsole.instance()
from PrettyPrint import prettyprint
-"""The length of domain names that Xen can handle.
-The names stored in Xen itself are not used for much, and
-xend can handle domain names of any length.
-"""
-MAX_DOMAIN_NAME = 15
-
"""Flag for a block device backend domain."""
SIF_BLK_BE_DOMAIN = (1<<4)
self.channel = None
self.controllers = {}
- self.devices = {}
self.configs = []
ctrl = self.getDeviceController(type)
return ctrl.getDeviceByIndex(idx)
- def getDeviceIndex(self, type, dev):
- ctrl = self.getDeviceController(type)
- return ctrl.getDeviceIndex(dev)
-
def getDeviceConfig(self, type, id):
ctrl = self.getDeviceController(type)
return ctrl.getDeviceConfig(id)
ctrl = self.getDeviceController(type)
return ctrl.getDeviceIds()
+ def getDeviceIndexes(self, type):
+ ctrl = self.getDeviceController(type)
+ return ctrl.getDeviceIndexes()
+
def getDeviceConfigs(self, type):
ctrl = self.getDeviceController(type)
return ctrl.getDeviceConfigs()
return sxpr
def sxpr_devices(self):
- sxpr = ['devices']
+ sxpr = []
for ty in self.getDeviceTypes():
- devs = [ ty ]
- devs += self.getDeviceSxprs(ty)
- sxpr.append(devs)
+ devs = self.getDeviceSxprs(ty)
+ sxpr += devs
+ if sxpr:
+ sxpr.insert(0, 'devices')
+ else:
+ sxpr = None
return sxpr
def check_name(self, name):
- """Check if a vm name is valid. Valid names start with a non-digit
- and contain alphabetic characters, digits, or characters in '_-.:/+'.
+ """Check if a vm name is valid. Valid names contain alphabetic characters,
+ digits, or characters in '_-.:/+'.
The same name cannot be used for more than one vm at the same time.
@param name: name
if self.recreate: return
if name is None or name == '':
raise VmError('missing vm name')
- if name[0] in string.digits:
- raise VmError('invalid vm name')
for c in name:
if c in string.digits: continue
if c in '_-.:/+': continue
val = None
if self.savedinfo is None:
return val
- devinfo = sxp.child(self.savedinfo, 'devices')
- if devinfo is None:
- return val
- devs = sxp.child(devinfo, type)
- if devs is None:
+ devices = sxp.child(self.savedinfo, 'devices')
+ if devices is None:
return val
index = str(index)
- for d in sxp.children(devs):
+ for d in sxp.children(devices, type):
dindex = sxp.child_value(d, 'index')
if dindex is None: continue
if str(dindex) == index:
def get_device_recreate(self, type, index):
return self.get_device_savedinfo(type, index) or self.recreate
- def limit_vif(self, vif, credit, period):
- """Limit the rate of a virtual interface
- @param vif: vif
- @param credit: vif credit in bytes
- @param period: vif period in uSec
- @return: 0 on success
- """
- #todo: all wrong
- #ctrl = xend.netif_create(self.dom, recreate=self.recreate)
- #d = ctrl.limitDevice(vif, credit, period)
- #return d
- pass
-
def add_config(self, val):
"""Add configuration data to a virtual machine.
if ctrl.isDestroyed(): continue
ctrl.destroyController(reboot=reboot)
if not reboot:
- self.devices = {}
- self.device_index = {}
self.configs = []
self.ipaddrs = []
print "image:"
sxp.show(self.image)
print
- for dl in self.devices:
- for dev in dl:
- print "device:"
- sxp.show(dev)
- print
for val in self.configs:
print "config:"
sxp.show(val)
at creation time, for example when it uses NFS root.
"""
- blkif = self.getDeviceController("blkif", error=False)
+ blkif = self.getDeviceController("vbd", error=False)
if not blkif:
- blkif = self.createDeviceController("blkif")
+ blkif = self.createDeviceController("vbd")
backend = blkif.getBackend(0)
backend.connect(recreate=self.recreate)
controller.addDevControllerClass("console", console.ConsoleController)
from server import blkif
-controller.addDevControllerClass("blkif", blkif.BlkifController)
-add_device_handler("vbd", "blkif")
+controller.addDevControllerClass("vbd", blkif.BlkifController)
+add_device_handler("vbd", "vbd")
from server import netif
-controller.addDevControllerClass("netif", netif.NetifController)
-add_device_handler("vif", "netif")
+controller.addDevControllerClass("vif", netif.NetifController)
+add_device_handler("vif", "vif")
from server import pciif
-controller.addDevControllerClass("pciif", pciif.PciController)
-add_device_handler("pci", "pciif")
+controller.addDevControllerClass("pci", pciif.PciController)
+add_device_handler("pci", "pci")
from xen.xend.server import usbif
-controller.addDevControllerClass("usbif", usbif.UsbifController)
-add_device_handler("usb", "usbif")
+controller.addDevControllerClass("usb", usbif.UsbifController)
+add_device_handler("usb", "usb")
#============================================================================
self.lock = threading.Lock()
self.schedule = {}
- def later(self, _delay, _name, _fn, args):
+ def later(self, _delay, _name, _fn, args, kwargs={}):
"""Schedule a function to be called later (if not already scheduled).
@param _delay: delay in seconds
@param _name: schedule name
@param _fn: function
- @param args: arguments
+ @param args: arguments (list)
+ @param kwargs keyword arguments (map)
"""
try:
self.lock.acquire()
if self.schedule.get(_name): return
- timer = threading.Timer(_delay, _fn, args=args)
+ runargs = [ _name, _fn, args, kwargs ]
+ timer = threading.Timer(_delay, self._run, args=runargs)
self.schedule[_name] = timer
finally:
self.lock.release()
@param name: schedule name to cancel
"""
+ timer = self._remove(name)
+ if timer:
+ timer.cancel()
+
+ def _remove(self, name):
try:
self.lock.acquire()
timer = self.schedule.get(name)
- if not timer:
- return
- del self.schedule[name]
+ if timer:
+ del self.schedule[name]
+ return timer
finally:
self.lock.release()
- timer.cancel()
+
+ def _run(self, name, fn, args, kwargs):
+ self._remove(name)
+ fn(*args, **kwargs)
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-import cgi
-
-import os
-import sys
-import types
-import StringIO
-
-from twisted.internet import defer
-from twisted.internet import reactor
-from twisted.protocols import http
-from twisted.web import error
-from twisted.web import resource
-from twisted.web import server
-from twisted.python.failure import Failure
-
-from xen.xend import sxp
-from xen.xend import PrettyPrint
-from xen.xend.Args import ArgError
-from xen.xend.XendError import XendError
-from xen.xend.XendLogging import log
-
-def uri_pathlist(p):
- """Split a path into a list.
- p path
- return list of path elements
- """
- l = []
- for x in p.split('/'):
- if x == '': continue
- l.append(x)
- return l
-
-class SrvBase(resource.Resource):
- """Base class for services.
- """
-
- def parse_form(self, req, method):
- """Parse the data for a request, GET using the URL, POST using encoded data.
- Posts should use enctype='multipart/form-data' in the <form> tag,
- rather than 'application/x-www-form-urlencoded'. Only 'multipart/form-data'
- handles file upload.
-
- req request
- returns a cgi.FieldStorage instance
- """
- env = {}
- env['REQUEST_METHOD'] = method
- if self.query:
- env['QUERY_STRING'] = self.query
- val = cgi.FieldStorage(fp=req.rfile, headers=req.headers, environ=env)
- return val
-
- def use_sxp(self, req):
- """Determine whether to send an SXP response to a request.
- Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept.
-
- req request
- returns 1 for SXP, 0 otherwise
- """
- ok = 0
- user_agent = req.getHeader('User-Agent')
- accept = req.getHeader('Accept')
- if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0):
- ok = 1
- return ok
-
- def get_op_method(self, op):
- """Get the method for an operation.
- For operation 'foo' looks for 'op_foo'.
-
- op operation name
- returns method or None
- """
- op_method_name = 'op_' + op
- return getattr(self, op_method_name, None)
-
- def perform(self, req):
- """General operation handler for posted operations.
- For operation 'foo' looks for a method op_foo and calls
- it with op_foo(op, req). Replies with code 500 if op_foo
- is not found.
-
- The method must return a list when req.use_sxp is true
- and an HTML string otherwise (or list).
- Methods may also return a Deferred (for incomplete processing).
-
- req request
- """
- op = req.args.get('op')
- if op is None or len(op) != 1:
- req.setResponseCode(http.NOT_ACCEPTABLE, "Invalid request")
- return ''
- op = op[0]
- op_method = self.get_op_method(op)
- if op_method is None:
- req.setResponseCode(http.NOT_IMPLEMENTED, "Operation not implemented: " + op)
- req.setHeader("Content-Type", "text/plain")
- req.write("Operation not implemented: " + op)
- return ''
- else:
- return self._perform(op, op_method, req)
-
- def _perform(self, op, op_method, req):
- try:
- val = op_method(op, req)
- except Exception, err:
- self._perform_err(err, op, req)
- return ''
-
- if isinstance(val, defer.Deferred):
- val.addCallback(self._perform_cb, op, req, dfr=1)
- val.addErrback(self._perform_err, op, req, dfr=1)
- return server.NOT_DONE_YET
- else:
- self._perform_cb(val, op, req, dfr=0)
- return ''
-
- def _perform_cb(self, val, op, req, dfr=0):
- """Callback to complete the request.
- May be called from a Deferred.
-
- @param err: the error
- @param req: request causing the error
- @param dfr: deferred flag
- """
- if isinstance(val, error.ErrorPage):
- req.write(val.render(req))
- elif self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- sxp.show(val, out=req)
- else:
- req.write('<html><head></head><body>')
- self.print_path(req)
- if isinstance(val, types.ListType):
- req.write('<code><pre>')
- PrettyPrint.prettyprint(val, out=req)
- req.write('</pre></code>')
- else:
- req.write(str(val))
- req.write('</body></html>')
- if dfr:
- req.finish()
-
- def _perform_err(self, err, op, req, dfr=0):
- """Error callback to complete a request.
- May be called from a Deferred.
-
- @param err: the error
- @param req: request causing the error
- @param dfr: deferred flag
- """
- if isinstance(err, Failure):
- err = err.getErrorMessage()
- elif not (isinstance(err, ArgError) or
- isinstance(err, sxp.ParseError) or
- isinstance(err, XendError)):
- if dfr:
- return err
- else:
- raise
- log.exception("op=%s: %s", op, str(err))
- if self.use_sxp(req):
- req.setHeader("Content-Type", sxp.mime_type)
- sxp.show(['xend.err', str(err)], out=req)
- else:
- req.setHeader("Content-Type", "text/plain")
- req.write('Error ')
- req.write(': ')
- req.write(str(err))
- if dfr:
- req.finish()
-
-
- def print_path(self, req):
- """Print the path with hyperlinks.
- """
- pathlist = [x for x in req.prepath if x != '' ]
- s = "/"
- req.write('<h1><a href="/">/</a>')
- for x in pathlist:
- s += x + "/"
- req.write(' <a href="%s">%s</a>/' % (s, x))
- req.write("</h1>")
+from xen.web.SrvBase import *
import traceback
import time
-from twisted.internet import pollreactor
-pollreactor.install()
-
+#from twisted.internet import pollreactor; pollreactor.install()
from twisted.internet import reactor
-from twisted.internet import protocol
-from twisted.internet import abstract
-from twisted.internet import defer
from xen.lowlevel import xu
from xen.xend import sxp
from xen.xend import PrettyPrint
-from xen.xend import EventServer
-eserver = EventServer.instance()
+from xen.xend import EventServer; eserver = EventServer.instance()
from xen.xend.XendError import XendError
from xen.xend.server import SrvServer
from xen.xend import XendRoot
log.info("Xend Daemon started")
self.createFactories()
self.listenEvent(xroot)
- self.listenVirq()
self.listenChannels()
- SrvServer.create(bridge=1)
+ serverthread = SrvServer.create(bridge=1)
self.daemonize()
+ print 'running serverthread...'
+ serverthread.start()
+ print 'running reactor...'
reactor.run()
except Exception, ex:
print >>sys.stderr, 'Exception starting xend:', ex
+ if DEBUG:
+ traceback.print_exc()
+ log.exception("Exception starting xend")
self.exit(1)
-
def createFactories(self):
self.channelF = channel.channelFactory()
return event.listenEvent(self, port, interface)
def listenChannels(self):
- self.channelF.start()
-
- def listenVirq(self):
def virqReceived(virq):
print 'virqReceived>', virq
eserver.inject('xend.virq', virq)
+
self.channelF.setVirqHandler(virqReceived)
+ self.channelF.start()
def exit(self, rc=0):
reactor.disconnectAll()
self.channelF.stop()
- sys.exit(rc)
+ # Calling sys.exit() raises a SystemExit exception, which only
+ # kills the current thread. Calling os._exit() makes the whole
+ # Python process exit immediately. There doesn't seem to be another
+ # way to exit a Python with running threads.
+ #sys.exit(rc)
+ os._exit(rc)
def instance():
global inst
+++ /dev/null
-# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-from SrvDir import SrvDir
-
-class SrvDeviceDir(SrvDir):
- """Device directory.
- """
-
- pass
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-from twisted.protocols import http
-from twisted.web import error
-
-from xen.xend import sxp
-from xen.xend.XendError import XendError
-
-from SrvBase import SrvBase
-
-class SrvError(error.ErrorPage):
-
- def render(self, request):
- val = error.ErrorPage.render(self, request)
- request.setResponseCode(self.code, self.brief)
- return val
-
-class SrvConstructor:
- """Delayed constructor for sub-servers.
- Does not import the sub-server class or create the object until needed.
- """
-
- def __init__(self, klass):
- """Create a constructor. It is assumed that the class
- should be imported as 'import klass from klass'.
-
- klass name of its class
- """
- self.klass = klass
- self.obj = None
-
- def getobj(self):
- """Get the sub-server object, importing its class and instantiating it if
- necessary.
- """
- if not self.obj:
- exec 'from %s import %s' % (self.klass, self.klass)
- klassobj = eval(self.klass)
- self.obj = klassobj()
- return self.obj
-
-class SrvDir(SrvBase):
- """Base class for directory servlets.
- """
- isLeaf = False
-
- def __init__(self):
- SrvBase.__init__(self)
- self.table = {}
- self.order = []
-
- def noChild(self, msg):
- return SrvError(http.NOT_FOUND, msg, msg)
-
- def getChild(self, x, req):
- if x == '': return self
- try:
- val = self.get(x)
- except XendError, ex:
- return self.noChild(str(ex))
- if val is None:
- return self.noChild('Not found ' + str(x))
- else:
- return val
-
- def get(self, x):
- val = self.table.get(x)
- if val is not None:
- val = val.getobj()
- return val
-
- def add(self, x, xclass = None):
- if xclass is None:
- xclass = 'SrvDir'
- self.table[x] = SrvConstructor(xclass)
- self.order.append(x)
-
- def render_GET(self, req):
- try:
- if self.use_sxp(req):
- req.setHeader("Content-type", sxp.mime_type)
- self.ls(req, 1)
- else:
- req.write('<html><head></head><body>')
- self.print_path(req)
- self.ls(req)
- self.form(req)
- req.write('</body></html>')
- return ''
- except Exception, ex:
- self._perform_err(ex, "GET", req)
-
- def ls(self, req, use_sxp=0):
- url = req.prePathURL()
- if not url.endswith('/'):
- url += '/'
- if use_sxp:
- req.write('(ls ')
- for k in self.order:
- req.write(' ' + k)
- req.write(')')
- else:
- req.write('<ul>')
- for k in self.order:
- v = self.get(k)
- req.write('<li><a href="%s%s">%s</a></li>'
- % (url, k, k))
- req.write('</ul>')
-
- def form(self, req):
- pass
+from xen.web.SrvBase import *
+from xen.web.SrvDir import *
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-from twisted.protocols import http
+from xen.web import http
from xen.xend import sxp
from xen.xend import XendDomain
import traceback
from StringIO import StringIO
-from twisted.protocols import http
-from twisted.web import error
-from twisted.python.failure import Failure
+from xen.web import http
from xen.xend import sxp
from xen.xend import XendDomain
('domain', 'SrvDomainDir' ),
('console', 'SrvConsoleDir' ),
('event', 'SrvEventDir' ),
- ('device', 'SrvDeviceDir' ),
('vnet', 'SrvVnetDir' ),
]
for (name, klass) in self.subdirs:
self.get(name)
xroot.start()
+
+ def __repr__(self):
+ return "<SrvRoot %x %s>" %(id(self), self.table.keys())
+
# todo Support security settings etc. in the config file.
# todo Support command-line args.
-from twisted.web import server, static
-from twisted.web import resource, script
-from twisted.internet import reactor
+from threading import Thread
+
+from xen.web.httpserver import HttpServer
from xen.xend import XendRoot
xroot = XendRoot.instance()
-
from xen.xend import Vifctl
-
from SrvRoot import SrvRoot
+from SrvDir import SrvDir
def create(port=None, interface=None, bridge=0):
if port is None:
interface = xroot.get_xend_address()
if bridge:
Vifctl.network('start')
- root = resource.Resource()
- xend = SrvRoot()
- root.putChild('xend', xend)
- site = server.Site(root)
- reactor.listenTCP(port, site, interface=interface)
-
-def main(port=None, interface=None):
- create(port, interface)
- reactor.run()
-
-
-if __name__ == '__main__':
- main()
+ root = SrvDir()
+ root.putChild('xend', SrvRoot())
+ server = HttpServer(root=root, interface=interface, port=port)
+ thread = Thread(name="XendHttpServer", target=server.run)
+ return thread
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-from twisted.protocols import http
+from xen.web import http
from xen.xend import sxp
from xen.xend import XendDomain
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-from twisted.web import static
+from xen.web import static
from xen.xend import XendRoot
try:
return self.logfile.render(req)
except Exception, ex:
- self._perform_err(ex, req)
+ self._perform_err(ex, 'log', req)
self.configure(self.config, recreate=recreate)
def init(self, recreate=False, reboot=False):
- print 'BlkDev>init>'
self.frontendDomain = self.getDomain()
self.frontendChannel = self.getChannel()
backend = self.getBackend()
self.backendChannel = backend.backendChannel
self.backendId = backend.id
- print 'BlkDev>init<'
def configure(self, config, change=False, recreate=False):
- print 'BlkDev>configure>'
if change:
raise XendError("cannot reconfigure vbd")
self.config = config
except:
raise XendError('invalid backend domain')
- print 'BlkDev>configure<'
return self.config
def attach(self, recreate=False, change=False):
- print 'BlkDev>attach>', self
if recreate:
- print 'attach>', 'recreate=', recreate
node = sxp.child_value(recreate, 'node')
- print 'attach>', 'node=', node
+ print 'BlkDev>attach>', 'recreate=', recreate, 'node=', node
self.setNode(node)
else:
node = Blkctl.block('bind', self.type, self.params)
self.attachBackend()
if change:
self.interfaceChanged()
- print 'BlkDev>attach<', self
def unbind(self):
if self.node is None: return
"""Attach the device to its controller.
"""
- print 'BlkDev>attachBackend>'
self.getBackend().connect()
self.send_be_vbd_create()
- print 'BlkDev>attachBackend<'
def send_be_vbd_create(self):
- print 'BlkDev>send_be_vbd_create>'
msg = packMsg('blkif_be_vbd_create_t',
{ 'domid' : self.frontendDomain,
'blkif_handle' : self.backendId,
self.rcvr = None
def initController(self, recreate=False, reboot=False):
- print 'BlkifController>initController>'
self.destroyed = False
# Add our handlers for incoming requests.
self.rcvr = CtrlMsgRcvr(self.getChannel())
if reboot:
self.rebootBackends()
self.rebootDevices()
- print 'BlkifController>initController<'
def sxpr(self):
val = ['blkif', ['dom', self.getDomain()]]
import socket
-from twisted.internet import reactor, protocol
+from xen.web import reactor, protocol
from xen.lowlevel import xu
self.console = console
self.id = id
self.addr = None
- self.binary = 0
- def connectionMade(self):
+ def connectionMade(self, addr=None):
peer = self.transport.getPeer()
- self.addr = (peer.host, peer.port)
+ self.addr = addr
if self.console.connect(self.addr, self):
self.transport.write("Cannot connect to console %d on domain %d\n"
- % (self.id, self.console.dom))
+ % (self.id, self.console.getDomain()))
self.loseConnection()
return
else:
return len(data)
def connectionLost(self, reason=None):
+ print 'ConsoleProtocol>connectionLost>', reason
log.info("Console disconnected %s %s %s",
str(self.id), str(self.addr[0]), str(self.addr[1]))
eserver.inject('xend.console.disconnect',
STATUS_LISTENING = 'listening'
def __init__(self, controller, id, config, recreate=False):
- print 'Console>'
Dev.__init__(self, controller, id, config)
self.status = self.STATUS_NEW
self.addr = None
[self.id, self.getDomain(), self.console_port])
def init(self, recreate=False, reboot=False):
- print 'Console>init>'
self.destroyed = False
self.channel = self.getChannel()
self.listen()
def destroy(self, change=False, reboot=False):
"""Close the console.
"""
+ print 'ConsoleDev>destroy>', self, reboot
if reboot:
return
self.status = self.STATUS_CLOSED
def listen(self):
"""Listen for TCP connections to the console port..
"""
- if self.closed(): return
+ if self.closed():
+ return
if self.listener:
pass
else:
returns 0 if ok, negative otherwise
"""
- if self.closed(): return -1
- if self.connected(): return -1
+ if self.closed():
+ return -1
+ if self.connected():
+ return -1
self.addr = addr
self.conn = conn
self.status = self.STATUS_CONNECTED
def disconnect(self, conn=None):
"""Disconnect the TCP connection to the console.
"""
+ print 'ConsoleDev>disconnect>', conn
if conn and conn != self.conn: return
if self.conn:
self.conn.loseConnection()
self.rebootDevices()
def destroyController(self, reboot=False):
+ print 'ConsoleController>destroyController>', self, reboot
self.destroyed = True
self.destroyDevices(reboot=reboot)
self.rcvr.deregisterChannel()
console = self.getDevice(0)
if console:
console.receiveOutput(msg)
+ else:
+ log.warning('no console: domain %d', self.getDomain())
def lostChannel(self):
"""Called when the channel to the domain is lost.
"""
- print 'CtrlMsgRcvr>lostChannel>',
+ if DEBUG:
+ print 'CtrlMsgRcvr>lostChannel>',
self.channel = None
def registerChannel(self):
raise NotImplementedError()
def createDevice(self, config, recreate=False, change=False):
- print 'DevController>createDevice>', 'config=', config, 'recreate=', recreate, 'change=', change
dev = self.newDevice(self.nextDeviceId(), config, recreate=recreate)
dev.init(recreate=recreate)
self.addDevice(dev)
idx = self.getDeviceIndex(dev)
recreate = self.vm.get_device_recreate(self.getType(), idx)
dev.attach(recreate=recreate, change=change)
- print 'DevController>createDevice<'
def configureDevice(self, id, config, change=False):
"""Reconfigure an existing device.
def getDeviceIds(self):
return [ dev.getId() for dev in self.device_order ]
+ def getDeviceIndexes(self):
+ return range(0, len(self.device_order))
+
def getDevices(self):
return self.device_order
self.device_order.remove(dev)
def rebootDevices(self):
- print 'DevController>rebootDevices>', self
for dev in self.getDevices():
dev.reboot()
def reboot(self):
"""Reconnect device when the domain is rebooted.
"""
- print 'Dev>reboot>', self
self.init(reboot=True)
self.attach()
+++ /dev/null
-# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-from xen.xend.XendError import XendError
-
-import channel
-import controller
-from messages import *
-
-class DomainControllerFactory(controller.ControllerFactory):
- """Factory for creating domain controllers.
- """
-
- def createController(self, dom):
- """Create a domain controller.
-
- dom domain
-
- returns domain controller
- """
- return DomainController(self, dom)
-
-class DomainController(controller.Controller):
- """Generic controller for a domain.
- Used for domain shutdown.
- """
-
- """Map shutdown reasons to the message type to use.
- """
- reasons = {'poweroff' : 'shutdown_poweroff_t',
- 'reboot' : 'shutdown_reboot_t',
- 'suspend' : 'shutdown_suspend_t',
- 'sysrq' : 'shutdown_sysrq_t' }
-
- def __init__(self, factory, dom):
- controller.Controller.__init__(self, factory, dom)
- self.addMethod(CMSG_SHUTDOWN, 0, None)
- self.addMethod(CMSG_MEM_REQUEST, 0, None)
- self.registerChannel()
-
- def shutdown(self, reason, key=0):
- """Shutdown a domain.
-
- reason shutdown reason
- key sysrq key (only if reason is 'sysrq')
- """
- msgtype = self.reasons.get(reason)
- if not msgtype:
- raise XendError('invalid reason:' + reason)
- extra = {}
- if reason == 'sysrq': extra['key'] = key
- print extra
- self.writeRequest(packMsg(msgtype, extra))
-
- def mem_target_set(self, target):
- """Set domain memory target in pages.
- """
- msg = packMsg('mem_request_t', { 'target' : target * (1 << 8)} )
- self.writeRequest(msg)
-from twisted.internet import reactor, protocol, defer
+import sys
+import StringIO
+
+from xen.web import reactor, protocol
from xen.lowlevel import xu
sxp.show(sxpr, out=io)
print >> io
io.seek(0)
- return self.transport.write(io.getvalue())
+ if self.transport:
+ return self.transport.write(io.getvalue())
+ else:
+ return 0
def send_result(self, res):
return self.send_reply(['ok', res])
def op_info(self, name, req):
val = ['info']
- val += self.daemon.consoles()
- val += self.daemon.blkifs()
- val += self.daemon.netifs()
- val += self.daemon.usbifs()
+ #val += self.daemon.consoles()
+ #val += self.daemon.blkifs()
+ #val += self.daemon.netifs()
+ #val += self.daemon.usbifs()
return val
def op_sys_subscribe(self, name, v):
import controller
controller.DEBUG = (mode == 'on')
-
class EventFactory(protocol.Factory):
"""Asynchronous handler for the event server socket.
"""
proto.factory = self
return proto
-
def listenEvent(daemon, port, interface):
- protocol = EventFactory(daemon)
- return reactor.listenTCP(port, protocol, interface=interface)
-
+ factory = EventFactory(daemon)
+ return reactor.listenTCP(port, factory, interface=interface)
import random
-from twisted.internet import defer
-
from xen.xend import sxp
from xen.xend import Vifctl
from xen.xend.XendError import XendError, VmError
@param id: interface id
@param config: device configuration
@param recreate: recreate flag (true after xend restart)
- @return: deferred
"""
return NetDev(self, id, config, recreate=recreate)